package com.atguigu.bigdata.spark

import java.sql.DriverManager
import org.apache.spark.sql.Row
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf



//创建自定义分区
object Spark02_Hbase15{

  def main(args: Array[String]): Unit = {
    //创建SparkConf
    //s设定spark计算框架的运行环境
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
    //创建Spark上下文环境
    val sc = new SparkContext(config)

    //构建HBase配置信息
    val conf: Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
    conf.set(TableInputFormat.INPUT_TABLE, "student")
    //从HBase读取数据形成RDD

    //从HBase读取数据形成RDD
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])

    hbaseRDD.foreach{
      case(rowkey,result)=>{
          val cells = result.rawCells()
          for(cell <- cells){
            println(Bytes.toString(CellUtil.cloneValue(cell)))
          }
      }
    }


//   新增数据
   /* val dataRDD = sc.makeRDD(List(("1004","xiaokeai"),("1005","woaini"),("1006","xiaobendan")))

    val putRDD:RDD[(ImmutableBytesWritable, Put)] = dataRDD.map {
      case (rowkey, name) => {
        val put = new Put(Bytes.toBytes(rowkey))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
        (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)

      }
    }
//    val conf = HBaseConfiguration.create()
    val conf: Configuration = HBaseConfiguration.create()
    val jobConf = new JobConf(conf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")
    putRDD.saveAsHadoopDataset(jobConf)*/

    sc.stop()
  }
}
